HEX
Server: LiteSpeed
System: Linux eticaretsrv4.isimtescil.net 3.10.0-962.3.2.lve1.5.26.7.el7.x86_64 #1 SMP Wed Oct 2 07:53:12 EDT 2019 x86_64
User: sioberen (1086)
PHP: 7.3.33
Disabled: NONE
Upload Files
File: //opt/alt/python37/lib/python3.7/site-packages/clselect/cluseroptselect.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
import os
import base64
import re

from builtins import map
from future.utils import iteritems

from .cluserextselect import ClUserExtSelect
from .clselectexcept import ClSelectExcept
from clcommon import clcaptain
from . import utils
from xml.sax.saxutils import unescape
from clcommon.utils import ExternalProgramFailed
from clcommon.php_conf_reader import PhpConfReader, PhpConfBaseException,\
    PhpConfReadError, PhpConfLoadException, PhpConfNoSuchAlternativeException


class ClUserOptSelect(ClUserExtSelect):
    """
    Class for processing user options
    """
    OPTIONS_PATH = '/etc/cl.selector/php.conf'

    def __init__(self, item='php', exclude_pid_list=None):
        ClUserExtSelect.__init__(self, item, exclude_pid_list)
        self._whitelist = {}
        self._user_excludes = set()
        self._html_escape_table = {" ": " ", '"': """, "'": "'",
                                   ">": "&gt;", "<": "&lt;", "&": "&amp;"}
        self._html_unescape_table = {v: k for k, v in iteritems(self._html_escape_table)}

    def insert_options(self, user, version,
                       optset, decoder, append=False, quiet=True, create=True):
        """
        Inserts supplied options into current ones
        @param optset: string
        @param decoder: string
        @param
        """
        options = {}
        if optset != '':
            options = self._process_option_string(
                optset=optset, decoder=decoder, expect_separator=True)
            options = self._remove_forbidden_options(options, version, quiet)
        self.insert_json_options(user, version, options, append, create)

    def insert_json_options(self, user, version,
                       options, append=False, create=True):
        """
        Inserts supplied options into current ones
        @param user: string
        @param version: string
        @param options: object
        """
        self._check_user_in_cagefs(user)
        user_ini_path = self._compose_user_ini_path(user, version)
        (contents, extensions,
            extensions_data) = self._load_ini_contents(user_ini_path)
        contents = self._prepare_options_data(contents)
        if append:
            contents.update(options)
        else:
            contents = options
        options_set = self._compose_options_set(contents)
        if options_set:
            options_set = self._wrap_options(options_set)
        data = self._compose_output_data(
            options_set, extensions, extensions_data)
        # Convert 'no value' values of directives
        for idx in range(0, len(data)):
            line = data[idx]
            line_parts = line.split('=')
            if len(line_parts) != 2:
                continue
            if line_parts[1] == 'no value':
                # put empty string instead 'no value' to directive value
                data[idx] = line_parts[0] + '='
        self._write_to_file(
            user, '\n'.join(data).rstrip()+'\n', user_ini_path, create)
        self._reload_processes(user)
        self._backup_settings(user, version, options_set, create)

    def delete_options(self, user, version,
                       optset, decoder, quiet=True):
        """
        Deletes supplied options from current ones
        """
        options = self._process_option_string(
            optset=optset, decoder=decoder, expect_separator=False)
        self._check_user_in_cagefs(user)
        user_ini_path = self._compose_user_ini_path(user, version)
        (contents, extensions,
            extensions_data) = self._load_ini_contents(user_ini_path)
        contents = self._prepare_options_data(contents)
        for opt in options.keys():
            contents.pop(opt, None)
        options_set = self._compose_options_set(contents)
        options_set = self._wrap_options(options_set)
        data = self._compose_output_data(
            options_set, extensions, extensions_data)
        self._write_to_file(
            user, '\n'.join(data).rstrip()+'\n', user_ini_path)
        self._reload_processes(user)
        self._backup_settings(user, version, options_set)

    def get_options(self, user, version=None):
        """
        Returns options summary for a user
        @param user: string
        @param version: string
        return: dict
        """
        if not version:
            version = self.get_version(user)[0]
        if version == 'native':
            raise ClSelectExcept.UnableToGetExtensions(version)
        self._get_ini_defaults(version)
        self._get_user_ini(user, version)
        return self._get_whitelist(version)

    def reset_options(self, users=None, versions=None):
        """
        Deletes all custom options settings
        @param users: list
        @param versions: list
        """
        all_users = self.list_all_users()
        alternatives = self.get_all_alternatives_data()
        for version in alternatives.keys():
            if versions and version not in versions:
                continue
            for user in all_users:
                if users and user not in users:
                    continue
                try:
                    self.insert_options(user=user, version=version,
                       optset='', decoder='plain', append=False, quiet=True,
                       create=False)
                except ClSelectExcept.NotCageFSUser:
                    continue

    def _prepare_options_data(self, contents):
        options = {}
        for item in contents:
            if item.strip() == "":
                continue
            if item.startswith(';>===') or item.startswith(';<==='):
                continue
            key, value = list(map((lambda x:x.strip()), item.split('=', 1)))
            if value == '':
                value = 'no value'
            options.update({key: value})
        return options

    def _get_whitelist(self, version):
        """
        Returns whitelist data
        """
        if not self._whitelist:
            self._load_whitelist(version)
        return self._whitelist

    def _load_whitelist(self, version):
        """
        Parses php config file (not php.ini!) and updates structure
        """
        # Get short_php_version_to_full map
        alternatives = self.get_all_alternatives_data()
        self._check_alternative(version, alternatives)
        if '.' not in version:
            raise ClSelectExcept.UnableToGetExtensions(version)
        # Short to full PHP version map. Example: {'4.4', '4.4.9'}
        php_versions = dict()
        for short_ver, ver_data in iteritems(alternatives):
            php_versions[short_ver] = ver_data['version']
        try:
            # Read config
            conf_reader = PhpConfReader(self.OPTIONS_PATH)
            php_conf_dict = conf_reader.get_config_for_selectorctl(version, php_versions)
            self._whitelist.update(php_conf_dict)
        except PhpConfNoSuchAlternativeException as e:
            raise ClSelectExcept.UnableToGetExtensions(e.php_version)
        except (PhpConfReadError, PhpConfLoadException, PhpConfBaseException) as e:
            raise ClSelectExcept.UnableToLoadData(self.OPTIONS_PATH, str(e))

    def _handle_option_item(option_item, expect_separator=True):
        """
        Splits options data into key-value pair and returns it
        @param option_item: string
        @param expect_separator: bool
        @return: dict
        """
        if ':' in option_item:
            option_name, option_value = option_item.split(':', 1)
        else:
            if not expect_separator:
                option_name, option_value = option_item, ''
            else:
                raise ClSelectExcept.WrongData(
                    "Colon as a separator expected (%s)!" % (option_item,))
        return {option_name: option_value}
    _handle_option_item = staticmethod(_handle_option_item)

    def _decoder(data, decoder='plain'):
        """
        Decodes option item
        @param data: string
        @param decoder: string
        @return: string
        """
        dispatcher = {
            'plain': (lambda x: x),
            'base64': (lambda x: base64.b64decode(x))}
        try:
            return dispatcher[decoder](data)
        except KeyError:
            return dispatcher['plain'](data)
    _decoder = staticmethod(_decoder)

    def _process_option_string(cls, optset, decoder='plain', expect_separator=True):
        """
        Wrapper around options parsing routines
        @param optset: string
        @param decoder: callback name
        @expect_separator: bool
        @return: dict
        """
        options = {}
        if optset:
            for option_item in optset.split(','):
                option_item = cls._decoder(option_item, decoder)
                options.update(
                    cls._handle_option_item(
                        option_item, expect_separator))
        return options
    _process_option_string = classmethod(_process_option_string)

    def _remove_forbidden_options(self, options, version, quiet=True):
        """
        Check if all options to process are present in white list
        and removes forbidden ones or raise an exception
        @param options: dict
        @param quiet: bool
        @return: dict
        """
        whitelist = self._get_whitelist(version)
        if not set(options.keys()).issubset(set(whitelist.keys())):
            white_list_options = {}
            for opt_name, opt_value in iteritems(options):
                if opt_name not in whitelist:
                    if quiet:
                        continue
                    else:
                        raise ClSelectExcept.UnableToProcessOption(opt_name)
                white_list_options[opt_name] = opt_value
            options = white_list_options
        return options

    def _compose_options_set(options):
        """
        Construct option item from key and value pair
        @param options: dict
        return: list
        """
        options_set = []
        for opt_name, opt_value in iteritems(options):
            options_set.append("%s=%s" % (opt_name, opt_value))
        return options_set
    _compose_options_set = staticmethod(_compose_options_set)

    def _wrap_options(self, contents):
        """
        Adds identifying string before and after dataset
        @param contents: list
        """
        data = [';>=== Start of PHP Selector Custom Options ===']
        data.extend(contents)
        data.append(';<=== End of PHP Selector Custom Options =====')
        return data

    def _compose_output_data(contents, extensions, extensions_data):
        """
        Construct output
        @param contents: list
        @param extensions: list
        @param extensions_data: dict
        return: list
        """
        data = []
        for item in extensions:
            data.extend(extensions_data[item])
            # Add two spacelines between each extension
            data.extend(["", ""])

        data.extend(contents)
        return data
    _compose_output_data = staticmethod(_compose_output_data)

    def _check_version(self, test, version):
        """
        Compares version in use and version required by PHP feature
        and return true if PHP feature satisfies
        """
        alternatives = self.get_all_alternatives_data()
        self._check_alternative(version, alternatives)
        if '.' not in version:
            raise ClSelectExcept.UnableToGetExtensions(version)
        v_array = list(map((lambda x: int(x)), alternatives[version]['version'].split('.')))
        # if test has 2 section, add third
        if len(test.split('.')) == 2:
            test += '.0'
        patt = re.compile(r'([<>=]{1,2})?(\d+\.\d+\.\d+)\.?')
        m = patt.match(test)
        if not m:
            raise ClSelectExcept.NoSuchAlternativeVersion(test)
        action = m.group(1)
        test = list(map((lambda x: int(x)), m.group(2).split('.')))
        version_int = v_array[0] << 11 | v_array[1] << 7 | v_array[2]
        test_int = test[0] << 11 | test[1] << 7 | test[2]
        if action == r'<' and version_int < test_int:
            return True
        if action == r'<=' and version_int <= test_int:
            return True
        if action == r'>' and version_int > test_int:
            return True
        if action == r'>=' and version_int >= test_int:
            return True
        if not action or action == r'=':
            version_int = v_array[0] << 11 | v_array[1] << 7
            test_int = test[0] << 11 | test[1] << 7
            if version_int == test_int:
                return True
        return False

    def _get_php_error_tbl(self, php_ver):
        # http://php.net/manual/en/errorfunc.constants.php
        php_error_table = {
            1:     'E_ERROR',
            2:     'E_WARNING',
            4:     'E_PARSE',
            8:     'E_NOTICE',
            16:    'E_CORE_ERROR',
            32:    'E_CORE_WARNING',
            64:    'E_COMPILE_ERROR',
            128:   'E_COMPILE_WARNING',
            256:   'E_USER_ERROR',
            512:   'E_USER_WARNING',
            1024:  'E_USER_NOTICE',
            2048:  'E_STRICT'  # E_STRICT since PHP 5 but not included in E_ALL until PHP 5.4.0
        }
        if self._check_version('<5.2.0', php_ver):
            php_error_table[2047] = 'E_ALL'
        if self._check_version('>=5.2.0', php_ver):
            php_error_table[4096] = 'E_RECOVERABLE_ERROR'  # E_RECOVERABLE_ERROR since PHP 5.2.0
            if self._check_version('<5.3.0', php_ver):
                php_error_table[6143] = 'E_ALL'  # E_ALL 6143 in PHP 5.2.x
        if self._check_version('>=5.3.0', php_ver):
            php_error_table[8192] = 'E_DEPRECATED'        # E_DEPRECATED since PHP 5.3.0
            php_error_table[16384] = 'E_USER_DEPRECATED'  # E_USER_DEPRECATED since PHP 5.3.0
            if self._check_version('<5.4.0', php_ver):
                php_error_table[30719] = 'E_ALL'  # E_ALL 30719 in PHP 5.3.x
        if self._check_version('>=5.4.0', php_ver):
            php_error_table[32767] = 'E_ALL'  # E_ALL 32767 in PHP >= 5.4.x
        return php_error_table

    def _php_string2error(self, str_, php_ver):
        """
        Convert php error level 'error-reporting' from string to code
        http://php.net/manual/ru/function.error-reporting.php
        #>>> ClUserOptSelect(item='php')._php_string2error('E_ALL & ~E_NOTICE', '5.4')
        32759
        #>>> ClUserOptSelect(item='php')._php_string2error('E_USER_ERROR | E_NOTICE', '5.4')
        264
        #>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR | E_WARNING | E_PARSE | E_COMPILE_ERROR', '5.4')
        71
        #>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR | INCORRECT', '5.4')  # incorrect variable 'INCORRECT'
        None
        #>>> ClUserOptSelect(item='php')._php_string2error('E_ERROR + E_WARNING', '5.4')   # incorrect operator '+'
        None
        :param str: error_reporting variable
        :return None|int: error_reporting error code; return None if can't convert
        """
        VALID_SYMBOLS = '0123456789|&~!^ '  # http://php.net/manual/en/errorfunc.constants.php
        php_error_table = self._get_php_error_tbl(php_ver)
        # replacing all constants to the numbers
        for code, name in iteritems(php_error_table):
            str_ = str_.replace(name, str(code))

        # check if str_ has only valid symbols
        if set(str_).difference(set(VALID_SYMBOLS)):
            return None

        try:
            error_code = int(eval(str_))
        except (SyntaxError, ValueError, TypeError):
            return None
        return error_code

    def _get_error_desc(self, value, version, range_):
        if not re.match(r'^-?\d{1,5}$', value):  # error-reporting code must be from 32767 to -32767
            return ''
        desc = []
        value = int(value)
        for error_string in range_:
            if self._php_string2error(error_string, php_ver=version) == value:
                return error_string

        php_error_table = self._get_php_error_tbl(php_ver=version)
        for error in php_error_table:
            if (error & value) == error:
                desc.append(php_error_table[error])
        return r' | '.join(desc)

    def _get_ini_defaults(self, version):
        """
        Gets PHP defaults (calls php -i)
        @param version: string
        """
        alternatives = self.get_all_alternatives_data()
        self._check_alternative(version, alternatives)
        whitelist = self._get_whitelist(version)
        if not os.path.isfile(alternatives[version]['data'][self._item]):
            raise ClSelectExcept.NoSuchAlternativeVersion(version)
        env_data = os.environ
        if ('SCRIPT_FILENAME' in env_data):
            script_path = '/usr/share/l.v.e-manager/utils/clinfo.php'
            if os.path.exists(script_path):
                env_data['SCRIPT_FILENAME'] = script_path
            cmd = [alternatives[version]['data'][self._item]]
        else:
            cmd = [alternatives[version]['data'][self._item], '-qi']
            env_data.pop('SERVER_SOFTWARE', None)
        env_data['PHP_FCGI_MAX_REQUESTS'] = '1'
        env_data['PHP_FCGI_CHILDREN'] = '0'
        env_data['ACCEPT_ENCODING'] = ''
        env_data['HTTP_ACCEPT_ENCODING'] = ''
        tag_pattern = re.compile(
            r'<tr[^>]*?><td[^>]*>(.*?)</td><td[^>]*>(.*?)</td>(?:<td[^>]*>(.*?)</td>)?</tr>')
        strip_pattern = re.compile(r'<[^>]*?>')
        cmd[1:1] = ['-d', 'opcache.enable_cli=0',
                    '-d', 'zlib.output_compression=Off',
                    '-d', 'auto_append_file=none',
                    '-d', 'auto_prepend_file=none']
        output = utils.run_command(cmd, env_data)
        lines = tag_pattern.findall(output)

        for l in lines:
            directive = re.sub(strip_pattern, '', l[0])
            if directive in whitelist:
                # convert html entries to string
                s = re.sub(strip_pattern, '', (l[2] or l[1]))
                value = unescape(s, self._html_unescape_table)
                if value == 'no value':
                    if ('default' in whitelist[directive] and
                            whitelist[directive]['default'] != ""):
                        continue
                    else:
                        whitelist[directive]['default'] = ""
                else:
                    if directive == 'error_reporting':
                        error_range = whitelist[directive]['range'].split(',')
                        value = self._get_error_desc(value, version, error_range)
                    whitelist[directive]['default'] = value
        self._whitelist.update(whitelist)

    def _get_user_ini(self, user, version):
        """
        Parses user ini file and updates
        values of existing data
        @param user: string
        """
        self._get_whitelist(version)
        user_ini_path = self._compose_user_ini_path(user, version)
        (contents, extensions,
            extensions_data) = self._load_ini_contents(user_ini_path)
        contents = self._prepare_options_data(contents)
        for key in contents:
            try:
                self._whitelist[key]['value'] = contents[key]
            except KeyError:
                continue

    def _backup_settings(self, user, version, data, create=True):
        """
        On saving user settings keep backup on user homedir
        @param user: string
        @param version: string
        @param data: list
        """
        user_backup_path = os.path.join(
            self._clpwd.get_homedir(user), '.cl.selector')
        if not os.path.isdir(user_backup_path):
            try:
                clcaptain.mkdir(user_backup_path)
            except (OSError, ExternalProgramFailed) as e:
                raise ClSelectExcept.UnableToSaveData(user_backup_path, e)
        user_backup_file = os.path.join(
            user_backup_path, "alt_php%s.cfg" % version.replace('.', ''))
        # replace 'no value' in directive value to empty
        for idx in range(0, len(data)):
            line = data[idx]
            line_parts = line.split('=')
            if len(line_parts) == 2 and line_parts[1] == 'no value':
                data[idx] = line_parts[0] + '='
        self._write_to_file(
            user, '\n'.join(data), user_backup_file, create)

    def backup_php_options(self, user):
        """
        rewrite php backup file with php options
        @param  user: string
        """
        self._check_user_in_cagefs(user)
        alternatives = self.get_all_alternatives_data()
        for version in alternatives.keys():
            user_ini_path = self._compose_user_ini_path(user, version)
            (contents, extensions,
                extensions_data) = self._load_ini_contents(user_ini_path)
            contents = self._prepare_options_data(contents)
            options_set = self._compose_options_set(contents)
            if options_set:
                options_set = self._wrap_options(options_set)
                self._backup_settings(user, version, options_set)